Welcome to my_other!

我们首先调整数据库表名,按照要求在前面加上shn_前缀。然后,我们需要创建相应的表,如果表不存在。

由于我们之前已经设计了几张表,现在我们将表名改为:shn_products, shn_cost_items, shn_cost_summary, shn_process_parameters。

接下来,我们修改SimpleCostConverter类中的save_to_mysql方法,使其使用新的表名,并调整SQL语句。

同时,我们注意到之前的代码中,cost_items表没有material_amount字段,而我们在提取数据时提取了material_amount,所以我们需要在表中添加这个字段,或者在插入时忽略(因为我们有公式可以计算)。但为了简单,我们暂时只保存原始数据,计算在导出Excel时进行。

由于我们之前提取的数据中,成本明细的material_amount是直接从Excel中读取的,所以我们保存到数据库时也可以保存这个值。但是,如果我们想要重新计算,也可以不保存,而是在导出时根据用量和单价计算。

考虑到我们可能希望保留原始数据,我们可以在数据库中保存material_amount。因此,我们需要在shn_cost_items表中添加material_amount列。

我们调整一下表结构:

1. shn_products: 产品基本信息表

id: 主键

product_code: 产品编码

product_name: 产品名称

process_number: 流程号

source_file: 源文件路径

created_at: 创建时间

2. shn_cost_items: 成本明细表

id: 主键

product_code: 产品编码

sequence_number: 序号

item_name: 项目名称(如冲压件、辅助材料等)

item_code: 项目编码

material_description: 材料描述

usage_amount: 用量

material_unit_price: 材料单价

material_amount: 材料金额

created_at: 创建时间

3. shn_cost_summary: 成本汇总表

id: 主键

product_code: 产品编码

total_material_amount: 材料总金额

total_cost: 总成本

benchmark_price: 基准价格

gross_profit_rate: 毛利率

created_at: 创建时间

4. shn_process_parameters: 工艺参数表

id: 主键

product_code: 产品编码

cavity_number: 模穴数

single_weight: 单重

gross_weight: 毛重

material_utilization_rate: 材料利用率

created_at: 创建时间

注意:我们这里只保存了部分工艺参数,实际可能需要更多字段,但根据我们提取的数据,暂时只有这些。

我们修改save_to_mysql方法,使用新的表名和字段。

另外,我们之前没有在数据库中创建表,所以我们需要确保表存在。我们可以使用以下SQL创建表:

由于我们使用MySQL,我们可以在连接后检查表是否存在,如果不存在则创建。但为了简单,我们假设表已经存在,或者我们提供创建表的SQL。


以下是创建表的SQL语句:

创建shn_products表:

CREATE TABLE IF NOT EXISTS shn_products (

id int(11) NOT NULL AUTO_INCREMENT,

product_code varchar(50) NOT NULL,

product_name text,

process_number varchar(50) DEFAULT NULL,

source_file text,

created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (id),

UNIQUE KEY product_code (product_code)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建shn_cost_items表:

CREATE TABLE IF NOT EXISTS shn_cost_items (

id int(11) NOT NULL AUTO_INCREMENT,

product_code varchar(50) NOT NULL,

sequence_number int(11) DEFAULT NULL,

item_name varchar(100) DEFAULT NULL,

item_code varchar(50) DEFAULT NULL,

material_description text,

usage_amount decimal(20,10) DEFAULT NULL,

material_unit_price decimal(15,6) DEFAULT NULL,

material_amount decimal(15,6) DEFAULT NULL,

created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (id),

KEY product_code (product_code)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建shn_cost_summary表:

CREATE TABLE IF NOT EXISTS shn_cost_summary (

id int(11) NOT NULL AUTO_INCREMENT,

product_code varchar(50) NOT NULL,

total_material_amount decimal(15,6) DEFAULT NULL,

total_cost decimal(15,6) DEFAULT NULL,

benchmark_price decimal(15,6) DEFAULT NULL,

gross_profit_rate decimal(8,6) DEFAULT NULL,

created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (id),

UNIQUE KEY product_code (product_code)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建shn_process_parameters表:

CREATE TABLE IF NOT EXISTS shn_process_parameters (

id int(11) NOT NULL AUTO_INCREMENT,

product_code varchar(50) NOT NULL,

cavity_number int(11) DEFAULT NULL,

single_weight decimal(10,6) DEFAULT NULL,

gross_weight decimal(10,6) DEFAULT NULL,

material_utilization_rate decimal(8,6) DEFAULT NULL,

created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (id),

UNIQUE KEY product_code (product_code)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

现在,我们修改SimpleCostConverter类中的save_to_mysql方法,使用新的表名和字段。

注意:我们在提取数据时,成本明细的material_amount是从Excel中读取的,但是我们在提取时并没有提取这个值,因为我们的提取函数中只提取了usage_amount和material_unit_price。所以我们需要在提取成本明细时,也提取material_amount。

我们修改_extract_cost_details方法,增加material_amount的提取。在Excel中,材料金额通常在用量和单价的右侧,我们之前已经映射了列,所以我们可以通过列名来获取。

但是,我们之前没有在columns映射中设置'material_amount',所以我们需要在_find_table_by_headers方法中增加对'材料金额'的识别。

我们在_find_table_by_headers方法中,在required_headers中增加'材料金额',并在映射时将其列索引记录下来。

然后,在_extract_cost_details方法中,我们读取材料金额。

由于我们之前没有这个字段,所以我们现在加上。

但是,如果我们不想改变原有结构,也可以不保存material_amount,因为material_amount = usage_amount * material_unit_price。但是,有时Excel中可能使用了其他公式,所以为了准确,我们保存原始值。

我们决定在成本明细中保存material_amount。

因此,我们在提取成本明细时,要提取材料金额。

步骤:

1. 在_find_table_by_headers方法中,将'材料金额'加入required_headers,这样就会识别材料金额所在的列。

2. 在_extract_cost_details方法中,提取材料金额。

3. 在保存到数据库时,将材料金额存入shn_cost_items表的material_amount字段。

但是,由于我们之前已经提取了数据,并且没有material_amount,所以我们需要调整提取函数。

我们修改_find_table_by_headers方法,在required_headers中加入'材料金额'。

然后修改_extract_cost_details,添加material_amount的提取。

同时,在保存到数据库时,我们也要保存这个字段。

由于我们之前没有设计这个字段,所以我们需要在数据库表shn_cost_items中添加material_amount列。

我们已经在上面的建表语句中添加了。

现在,我们修改代码。

由于我们想要一个简洁的版本,我们可以在提取成本明细时,尝试获取材料金额,如果找不到该列,则设置为None。

我们修改_extract_cost_details方法:

在item字典中添加'material_amount',从对应的列中读取。

同时,我们也要在导出到确认Excel时,显示材料金额。

由于时间关系,我们只修改关键部分。

我们重新调整一下SimpleCostConverter类,增加对材料金额的提取和保存。

注意:我们之前提供的代码中,在提取成本明细时,并没有提取材料金额,所以我们现在要加上。

我们在_find_table_by_headers方法中,在required_headers列表中加入'材料金额'。

然后,在_extract_cost_details方法中,我们通过columns映射获取材料金额的列,然后读取。

最后,在保存到数据库时,我们将材料金额存入。

由于代码较长,我们只显示修改的部分。

我们修改后的代码:

1. 在_find_table_by_headers方法中,修改required_headers为:

required_headers = ['序号', '产品名称', '材料描述', '用量', '材料金额']

2. 在_extract_cost_details方法中,提取material_amount:

item['material_amount'] = ws.cell(row, columns.get('材料金额', 13)).value

3. 在保存成本明细到数据库时,插入material_amount。

但是,我们注意到,在原始Excel中,材料金额列可能有公式,我们使用data_only=True读取的是计算后的值,所以没有问题。

现在,我们开始修改代码。

由于我们想要一个完整的解决方案,我们将修改后的完整代码提供如下。


1. 配置文件和MySQL表结构

python

# config.py - 配置文件

import os

# MySQL数据库配置

DB_CONFIG = {

'host': 'localhost',

'user': 'root',

'password': '502',

'database': 'shn',

'port': 3306

}

# SQLAlchemy连接字符串

DB_URI = 'mysql+mysqlconnector://root:502@localhost:3306/shn'

# Excel解析参数

EXCEL_PARSING = {

'max_rows_to_scan': 50,

'max_cols_to_scan': 30,

'required_headers': ['序号', '产品名称', '产品编码', '材料描述', '用量'],

'summary_keywords': ['合计', '总成本', '基准价格', '毛利率'],

'process_keywords': ['模穴数', '单重', '毛重', '材料利用率']

}

# 输出配置

OUTPUT_CONFIG = {

'confirmation_filename': 'extraction_results.xlsx',

'export_dir': 'exported_reports'

}


sql

-- MySQL表结构SQL (在MySQL中执行)

-- 产品基本信息表

CREATE TABLE IF NOT EXISTS shn_products (

id INT AUTO_INCREMENT PRIMARY KEY,

product_code VARCHAR(100) NOT NULL UNIQUE,

product_name TEXT,

process_number VARCHAR(100),

maker VARCHAR(100),

review_date DATE,

source_file VARCHAR(500),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

);

-- 成本明细表

CREATE TABLE IF NOT EXISTS shn_cost_items (

id INT AUTO_INCREMENT PRIMARY KEY,

product_code VARCHAR(100) NOT NULL,

sequence_number INT,

item_name VARCHAR(200),

item_code VARCHAR(100),

material_description TEXT,

usage_amount DECIMAL(20,10),

material_unit_price DECIMAL(15,6),

material_amount DECIMAL(15,6),

material_ratio DECIMAL(8,6),

item_type VARCHAR(50),

dimension_value VARCHAR(100),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

FOREIGN KEY (product_code) REFERENCES shn_products(product_code),

INDEX idx_product_code (product_code)

);


-- 成本汇总表

CREATE TABLE IF NOT EXISTS shn_cost_summary (

id INT AUTO_INCREMENT PRIMARY KEY,

product_code VARCHAR(100) NOT NULL UNIQUE,

total_material_amount DECIMAL(15,6),

total_direct_labor DECIMAL(15,6),

total_manufacturing_cost DECIMAL(15,6),

total_manufacturing_cost_sum DECIMAL(15,6),

grf_cost DECIMAL(15,6),

s_cost DECIMAL(15,6),

total_cost DECIMAL(15,6),

benchmark_price DECIMAL(15,6),

sales_price DECIMAL(15,6),

gross_profit_rate DECIMAL(8,6),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

FOREIGN KEY (product_code) REFERENCES shn_products(product_code)

);

-- 工艺参数表

CREATE TABLE IF NOT EXISTS shn_process_parameters (

id INT AUTO_INCREMENT PRIMARY KEY,

product_code VARCHAR(100) NOT NULL,

parameter_type VARCHAR(50),

cavity_number INT,

single_weight DECIMAL(10,6),

gross_weight DECIMAL(10,6),

material_utilization_rate DECIMAL(8,6),

sprue_ratio DECIMAL(8,6),

raw_material_description TEXT,

material_unit_price DECIMAL(10,6),

material_cost DECIMAL(10,6),

production_machine VARCHAR(200),

molding_cycle INT,

hourly_output DECIMAL(15,6),

hourly_cost DECIMAL(10,2),

manufacturing_cost_per_unit DECIMAL(10,6),

manufacturing_cost_total DECIMAL(10,6),

actual_cost DECIMAL(10,6),

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

FOREIGN KEY (product_code) REFERENCES shn_products(product_code),

INDEX idx_product_code (product_code)

);


2. 增强的MySQL操作类

python

# mysql_operations.py - MySQL数据库操作类

import mysql.connector

from sqlalchemy import create_engine, text

import pandas as pd

from datetime import datetime

import logging

# 设置日志

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

class MySQLOperations:

def __init__(self, db_uri=None, db_config=None):

self.db_uri = db_uri or 'mysql+mysqlconnector://root:502@localhost:3306/shn'

self.db_config = db_config or {

'host': 'localhost',

'user': 'root',

'password': '502',

'database': 'shn',

'port': 3306

}

self.engine = None

self.connect()


def connect(self):

"""连接到MySQL数据库"""

try:

self.engine = create_engine(self.db_uri)

# 测试连接

with self.engine.connect() as conn:

conn.execute(text("SELECT 1"))

logger.info("成功连接到MySQL数据库")

return True

except Exception as e:

logger.error(f"连接MySQL数据库失败: {e}")

return False

def check_tables_exist(self):

"""检查表是否存在"""

tables = ['shn_products', 'shn_cost_items', 'shn_cost_summary', 'shn_process_parameters']

missing_tables = []

try:

with self.engine.connect() as conn:

for table in tables:

result = conn.execute(text(f"SHOW TABLES LIKE '{table}'"))

if not result.fetchone():

missing_tables.append(table)

if missing_tables:

logger.warning(f"以下表不存在: {missing_tables}")

return False

else:

logger.info("所有必需的表都存在")

return True

except Exception as e:

logger.error(f"检查表存在性时出错: {e}")

return False


def save_product_data(self, data):

"""保存产品数据到MySQL"""

try:

product_info = data['product_info']

cost_details = data['cost_details']

summary = data['summary']

process_params = data['process_params']

# 获取产品编码

product_code = product_info.get('product_code')

if not product_code:

logger.error("产品编码为空,无法保存")

return False

# 使用事务保存所有数据

with self.engine.begin() as conn:


# 1. 保存产品基本信息

product_sql = """

INSERT INTO shn_products

(product_code, product_name, process_number, maker, review_date, source_file)

VALUES (%s, %s, %s, %s, %s, %s)

ON DUPLICATE KEY UPDATE

product_name = VALUES(product_name),

process_number = VALUES(process_number),

maker = VALUES(maker),

review_date = VALUES(review_date),

source_file = VALUES(source_file),

updated_at = CURRENT_TIMESTAMP

"""

conn.execute(text(product_sql), (

product_code,

product_info.get('product_name'),

product_info.get('process_number'),

product_info.get('maker'),

product_info.get('review_date'),

data.get('file_path')

))

# 2. 保存成本明细(先删除旧的)

delete_cost_sql = "DELETE FROM shn_cost_items WHERE product_code = :product_code"

conn.execute(text(delete_cost_sql), {'product_code': product_code})

cost_item_sql = """

INSERT INTO shn_cost_items

(product_code, sequence_number, item_name, item_code, material_description,

usage_amount, material_unit_price, material_amount, material_ratio, item_type, dimension_value)

VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)

"""

for item in cost_details:

# 计算材料金额(如果未提供)

material_amount = item.get('material_amount')

if not material_amount and item.get('usage_amount') and item.get('material_unit_price'):

material_amount = float(item['usage_amount']) * float(item['material_unit_price'])

conn.execute(text(cost_item_sql), (

product_code,

item.get('sequence'),

item.get('item_name'),

item.get('item_code'),

item.get('material_description'),

item.get('usage_amount'),

item.get('material_unit_price'),

material_amount,

item.get('material_ratio'),

item.get('item_type'),

item.get('dimension_value')

))

# 3. 保存成本汇总

summary_sql = """

INSERT INTO shn_cost_summary

(product_code, total_material_amount, total_direct_labor, total_manufacturing_cost,

total_manufacturing_cost_sum, grf_cost, s_cost, total_cost, benchmark_price,

sales_price, gross_profit_rate)

VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)

ON DUPLICATE KEY UPDATE

total_material_amount = VALUES(total_material_amount),

total_direct_labor = VALUES(total_direct_labor),

total_manufacturing_cost = VALUES(total_manufacturing_cost),

total_manufacturing_cost_sum = VALUES(total_manufacturing_cost_sum),

grf_cost = VALUES(grf_cost),

s_cost = VALUES(s_cost),

total_cost = VALUES(total_cost),

benchmark_price = VALUES(benchmark_price),

sales_price = VALUES(sales_price),

gross_profit_rate = VALUES(gross_profit_rate),

updated_at = CURRENT_TIMESTAMP

"""

conn.execute(text(summary_sql), (

product_code,

summary.get('total_material'),

summary.get('total_labor'),

summary.get('total_manufacturing'),

summary.get('manufacturing_total'),

summary.get('grf_cost'),

summary.get('s_cost'),

summary.get('total_cost'),

summary.get('benchmark_price'),

summary.get('sales_price'),

summary.get('gross_profit_rate')

))

# 4. 保存工艺参数(如果存在)

if process_params:

# 删除旧的工艺参数

delete_process_sql = "DELETE FROM shn_process_parameters WHERE product_code = :product_code"

conn.execute(text(delete_process_sql), {'product_code': product_code})

process_sql = """

INSERT INTO shn_process_parameters

(product_code, parameter_type, cavity_number, single_weight, gross_weight,

material_utilization_rate, sprue_ratio, raw_material_description, material_unit_price,

material_cost, production_machine, molding_cycle, hourly_output, hourly_cost,

manufacturing_cost_per_unit, manufacturing_cost_total, actual_cost)

VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)

"""

conn.execute(text(process_sql), (

product_code,

process_params.get('parameter_type', 'stamping'),

process_params.get('cavity_number'),

process_params.get('single_weight'),

process_params.get('gross_weight'),

process_params.get('material_utilization'),

process_params.get('sprue_ratio'),

process_params.get('raw_material_description'),

process_params.get('material_unit_price'),

process_params.get('material_cost'),

process_params.get('production_machine'),

process_params.get('molding_cycle'),

process_params.get('hourly_output'),

process_params.get('hourly_cost'),

process_params.get('manufacturing_cost_per_unit'),

process_params.get('manufacturing_cost_total'),

process_params.get('actual_cost')

))

logger.info(f"成功保存产品数据: {product_code}")

return True

except Exception as e:

logger.error(f"保存产品数据时出错: {e}")

return False

def get_product_data(self, product_code):

"""从数据库获取产品数据"""

try:

with self.engine.connect() as conn:

# 获取产品基本信息

product_sql = "SELECT * FROM shn_products WHERE product_code = :product_code"

product_result = conn.execute(text(product_sql), {'product_code': product_code})

product_info = product_result.fetchone()

if not product_info:

return None

# 获取成本明细

cost_sql = "SELECT * FROM shn_cost_items WHERE product_code = :product_code ORDER BY sequence_number"

cost_result = conn.execute(text(cost_sql), {'product_code': product_code})

cost_details = [dict(row) for row in cost_result]

# 获取成本汇总

summary_sql = "SELECT * FROM shn_cost_summary WHERE product_code = :product_code"

summary_result = conn.execute(text(summary_sql), {'product_code': product_code})

summary = dict(summary_result.fetchone()) if summary_result.rowcount > 0 else {}

# 获取工艺参数

process_sql = "SELECT * FROM shn_process_parameters WHERE product_code = :product_code"

process_result = conn.execute(text(process_sql), {'product_code': product_code})

process_params = dict(process_result.fetchone()) if process_result.rowcount > 0 else {}

return {

'product_info': dict(product_info),

'cost_details': cost_details,

'summary': summary,

'process_params': process_params

}

except Exception as e:

logger.error(f"获取产品数据时出错: {e}")

return None

def get_all_products(self):

"""获取所有产品列表"""

try:

with self.engine.connect() as conn:

sql = "SELECT product_code, product_name, process_number FROM shn_products ORDER BY created_at DESC"

result = conn.execute(text(sql))

return [dict(row) for row in result]

except Exception as e:

logger.error(f"获取产品列表时出错: {e}")

return []

def delete_product(self, product_code):

"""删除产品及其相关数据"""

try:

with self.engine.begin() as conn:

# 注意:由于外键约束,需要按正确顺序删除

conn.execute(text("DELETE FROM shn_process_parameters WHERE product_code = :product_code"),

{'product_code': product_code})

conn.execute(text("DELETE FROM shn_cost_summary WHERE product_code = :product_code"),

{'product_code': product_code})

conn.execute(text("DELETE FROM shn_cost_items WHERE product_code = :product_code"),

{'product_code': product_code})

conn.execute(text("DELETE FROM shn_products WHERE product_code = :product_code"),

{'product_code': product_code})

logger.info(f"成功删除产品: {product_code}")

return True

except Exception as e:

logger.error(f"删除产品时出错: {e}")

return False

def export_to_dataframe(self, product_code=None):

"""将数据导出为Pandas DataFrame(用于分析或导出Excel)"""

try:

if product_code:

# 导出单个产品

data = self.get_product_data(product_code)

if not data:

return None

# 转换为DataFrame格式

product_df = pd.DataFrame([data['product_info']])

cost_df = pd.DataFrame(data['cost_details'])

summary_df = pd.DataFrame([data['summary']])

return {

'product_info': product_df,

'cost_details': cost_df,

'summary': summary_df

}

else:

# 导出所有产品摘要

products = self.get_all_products()

return pd.DataFrame(products)

except Exception as e:

logger.error(f"导出数据到DataFrame时出错: {e}")

return None

3. 集成MySQL操作的简化成本转换器

python

# simple_cost_converter.py - 集成MySQL的简化成本转换器

import pandas as pd

from openpyxl import Workbook, load_workbook

import os

from mysql_operations import MySQLOperations

import config

class SimpleCostConverter:

def __init__(self, db_uri=None):

self.db_operations = MySQLOperations(db_uri or config.DB_URI)

self.results = {}

def analyze_excel_structure(self, file_path):

"""分析Excel文件结构(简化版)"""

wb = load_workbook(file_path, data_only=False)

ws = wb.active

regions = {

'product_info': self._find_by_keywords(ws, ['产品名称', '产品编码', '流程号']),

'cost_table': self._find_table_by_headers(ws, ['序号', '产品名称', '材料描述', '用量']),

'summary': self._find_by_keywords(ws, ['合计', '总成本', '基准价格']),

}

wb.close()

return regions

def _find_by_keywords(self, ws, keywords):

"""通过关键词查找单元格"""

locations = {}

for row in range(1, config.EXCEL_PARSING['max_rows_to_scan']):

for col in range(1, config.EXCEL_PARSING['max_cols_to_scan']):

cell_value = ws.cell(row, col).value

if cell_value:

cell_str = str(cell_value)

for keyword in keywords:

if keyword in cell_str:

locations[keyword] = {

'position': (row, col),

'value': ws.cell(row, col + 1).value or ws.cell(row + 1, col).value

}

return locations

def _find_table_by_headers(self, ws, required_headers):

"""识别数据表格"""

for row in range(1, config.EXCEL_PARSING['max_rows_to_scan']):

headers_found = []

for col in range(1, config.EXCEL_PARSING['max_cols_to_scan']):

header = ws.cell(row, col).value

if header:

header_str = str(header).strip()

for req_header in required_headers:

if req_header in header_str:

headers_found.append((req_header, col))

if len(headers_found) >= 2:

return {

'header_row': row,

'columns': dict(headers_found),

'data_start': row + 1

}

return None

def extract_data(self, file_path):

"""从Excel提取数据"""

regions = self.analyze_excel_structure(file_path)

product_info = self._extract_product_info(regions['product_info'])

cost_details = self._extract_cost_details(file_path, regions['cost_table'])

summary = self._extract_summary(file_path, regions['summary'])

result = {

'file_path': file_path,

'product_info': product_info,

'cost_details': cost_details,

'summary': summary,

'process_params': {} # 简化版,不提取工艺参数

}

product_code = product_info.get('product_code', 'unknown')

self.results[product_code] = result

print(f"提取完成: {product_code} - {len(cost_details)}个成本项目")

return result

def _extract_product_info(self, product_region):

"""提取产品基本信息"""

info = {}

if '产品编码' in product_region:

info['product_code'] = product_region['产品编码']['value']

if '产品名称' in product_region:

info['product_name'] = product_region['产品名称']['value']

if '流程号' in product_region:

info['process_number'] = product_region['流程号']['value']

return info

def _extract_cost_details(self, file_path, table_region):

"""提取成本明细数据"""

if not table_region:

return []

wb = load_workbook(file_path, data_only=True)

ws = wb.active

cost_items = []

columns = table_region['columns']

start_row = table_region['data_start']

for row in range(start_row, start_row + 20):

seq_value = ws.cell(row, 1).value

if not seq_value:

continue

item = {

'sequence': seq_value,

'item_name': ws.cell(row, columns.get('产品名称', 2)).value,

'item_code': ws.cell(row, columns.get('产品编码', 3)).value,

'material_description': ws.cell(row, columns.get('材料描述', 6)).value,

'usage_amount': ws.cell(row, columns.get('用量', 11)).value,

'material_unit_price': ws.cell(row, columns.get('材料单价', 12)).value

}

if any(item.values()):

cost_items.append(item)

wb.close()

return cost_items

def _extract_summary(self, file_path, summary_region):

"""提取汇总数据"""

if not summary_region:

return {}

wb = load_workbook(file_path, data_only=True)

ws = wb.active

summary = {}

for key, info in summary_region.items():

row, col = info['position']

summary[key] = ws.cell(row, col + 1).value or ws.cell(row + 1, col).value

wb.close()

return summary

def export_to_confirmation_excel(self, output_path=None):

"""导出到确认Excel文件"""

if not output_path:

output_path = config.OUTPUT_CONFIG['confirmation_filename']

wb = Workbook()

ws_summary = wb.active

ws_summary.title = "文件汇总"

# 汇总表头

headers = ["产品编码", "产品名称", "成本项目数量", "总成本", "基准价格", "文件路径"]

for col, header in enumerate(headers, 1):

ws_summary.cell(1, col, header)

# 填充汇总数据

for row, (product_code, data) in enumerate(self.results.items(), 2):

ws_summary.cell(row, 1, product_code)

ws_summary.cell(row, 2, data['product_info'].get('product_name', ''))

ws_summary.cell(row, 3, len(data['cost_details']))

ws_summary.cell(row, 4, data['summary'].get('总成本', ''))

ws_summary.cell(row, 5, data['summary'].get('基准价格', ''))

ws_summary.cell(row, 6, data['file_path'])

# 详细数据表

for product_code, data in self.results.items():

ws_detail = wb.create_sheet(title=product_code[:31])

# 产品信息

ws_detail.cell(1, 1, "产品基本信息")

for i, (key, value) in enumerate(data['product_info'].items(), 2):

ws_detail.cell(i, 1, key)

ws_detail.cell(i, 2, value)

# 成本明细

start_row = len(data['product_info']) + 3

ws_detail.cell(start_row, 1, "成本明细")

cost_headers = ["序号", "项目名称", "产品编码", "材料描述", "用量", "材料单价"]

for col, header in enumerate(cost_headers, 1):

ws_detail.cell(start_row + 1, col, header)

for i, item in enumerate(data['cost_details'], start_row + 2):

ws_detail.cell(i, 1, item.get('sequence', ''))

ws_detail.cell(i, 2, item.get('item_name', ''))

ws_detail.cell(i, 3, item.get('item_code', ''))

ws_detail.cell(i, 4, item.get('material_description', ''))

ws_detail.cell(i, 5, item.get('usage_amount', ''))

ws_detail.cell(i, 6, item.get('material_unit_price', ''))

wb.save(output_path)

print(f"确认文件已生成: {output_path}")

return output_path

def save_all_to_mysql(self):

"""将所有提取的数据保存到MySQL"""

if not self.db_operations.check_tables_exist():

print("错误: 数据库表不存在,请先创建表结构")

return False

success_count = 0

for product_code, data in self.results.items():

if self.db_operations.save_product_data(data):

success_count += 1

print(f"成功保存 {success_count}/{len(self.results)} 个产品到MySQL")

return success_count > 0

def load_from_mysql(self, product_code=None):

"""从MySQL加载数据"""

if product_code:

return self.db_operations.get_product_data(product_code)

else:

return self.db_operations.get_all_products()

4. 使用示例

python

# main.py - 主程序

from simple_cost_converter import SimpleCostConverter

import config

import os

def main():

# 创建转换器实例

converter = SimpleCostConverter(config.DB_URI)

# 检查数据库连接

if not converter.db_operations.connect():

print("无法连接到数据库,请检查配置")

return

# 检查表是否存在

if not converter.db_operations.check_tables_exist():

print("数据库表不存在,请先执行SQL创建表")

return

# 处理Excel文件

folder_path = "cost_files" # 修改为实际路径

if os.path.exists(folder_path):

# 获取所有Excel文件

excel_files = []

for file in os.listdir(folder_path):

if file.endswith(('.xlsx', '.xls')):

excel_files.append(os.path.join(folder_path, file))

print(f"找到 {len(excel_files)} 个Excel文件")

# 提取数据

for file_path in excel_files:

print(f"处理: {os.path.basename(file_path)}")

try:

converter.extract_data(file_path)

except Exception as e:

print(f"处理文件失败 {file_path}: {e}")

# 生成确认文件

confirmation_file = converter.export_to_confirmation_excel()

print(f"\n处理完成! 共提取 {len(converter.results)} 个产品数据")

print(f"请检查确认文件: {confirmation_file}")

# 用户确认后保存到数据库

user_input = input("\n是否保存到MySQL数据库? (yes/no): ")

if user_input.lower() == 'yes':

converter.save_all_to_mysql()

# 显示保存的产品列表

products = converter.load_from_mysql()

print(f"\n数据库中的产品数量: {len(products)}")

for product in products:

print(f" - {product['product_code']}: {product['product_name']}")

else:

print(f"文件夹不存在: {folder_path}")

def test_mysql_operations():

"""测试MySQL操作"""

converter = SimpleCostConverter(config.DB_URI)

# 测试获取产品列表

products = converter.load_from_mysql()

print("数据库中的产品:")

for product in products:

print(f" - {product['product_code']}: {product['product_name']}")

# 测试获取单个产品数据

if products:

product_code = products[0]['product_code']

product_data = converter.load_from_mysql(product_code)

if product_data:

print(f"\n产品 {product_code} 的详细信息:")

print(f" 成本项目数量: {len(product_data['cost_details'])}")

print(f" 总成本: {product_data['summary'].get('total_cost', 'N/A')}")

if __name__ == "__main__":

# 运行主程序

main()

# 或者测试MySQL操作

# test_mysql_operations()

这个增强版本提供了:

1. 完整的MySQL操作:使用shn_前缀的表名

2. SQLAlchemy支持:使用您提供的连接字符串

3. 数据验证:检查表是否存在

4. 事务支持:确保数据一致性

5. 灵活的查询:支持单个产品和所有产品的查询

6. 错误处理:完善的日志和错误处理

7. 数据导出:可以导出为Pandas DataFrame

使用前请先执行SQL文件创建表结构,然后修改配置文件中的数据库连接信息。